package org.spider.api.concurrent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spider.api.domain.utilDomain.SpiderNode;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 父子线程池，父线程提交flow，子线程执行flow中的具体任务
 */
public class SpiderThreadPoolExecutorManager {
    private int maxThreads;
    private ThreadPoolExecutor executor;
    // 采用父子线程，一个父线程对应多个子线程 子线程都属于同一个线程组 前缀都是spider-flow
    private static final ThreadGroup spiderThreadGroup = new ThreadGroup("spider-group");

    /**
     * 线程名称前缀,用来区分前缀
     */
    private static final String THREAD_POOL_NAME_PREFIX = "flow-";
    private final AtomicInteger threadNumber = new AtomicInteger(1); //
    private static Logger logger= LoggerFactory.getLogger(SpiderThreadPoolExecutorManager.class);

    public SpiderThreadPoolExecutorManager(int maxThreads) {
        this.maxThreads = maxThreads;
        // 线程工厂 线程工厂创建线程的方法如下所示，每次都会调用getAndIncrement()
        ThreadFactory factory = r -> new Thread(spiderThreadGroup, r,
                THREAD_POOL_NAME_PREFIX + threadNumber.getAndIncrement());
        this.executor = new ThreadPoolExecutor(maxThreads, maxThreads,
                10, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(), factory);
    }

    // 提交任务
    public Future<?> submit(Runnable runnable) {
        return this.executor.submit(runnable);
    }

    /**
     * 外部类生成对应的内部类
     * 父子线程pool
     *
     * @param threads
     * @param strategyQueue
     * @return
     */
    public SubThreadPool createSubThreadExecutor(int threads, StrategyQueue strategyQueue) {
//        logger.debug("isShutDown:{} , isTerminated:{}",executor.isShutdown(),executor.isTerminated());
        return new SubThreadPool(Math.min(maxThreads, threads), strategyQueue);
    }

    /**
     * 手写线程池
     */
    public class SubThreadPool  extends Thread{
        /**
         * 线程池大小
         */
        private int threads;

        /**
         * 正在执行中的任务
         */
        private Future<?>[] futures;

        /**
         * 执行中的数量
         */
        private AtomicInteger executingNum = new AtomicInteger(0);

        /**
         *是否运行中(作用)
         */
        private volatile boolean running = true;


        private final StrategyQueue strategyQueue;

        public SubThreadPool(int threads, StrategyQueue strategyQueue) {
//            super(spiderThreadGroup,"subThreadPool");
            this.threads = threads;
            this.futures = new Future[threads];
            this.strategyQueue = strategyQueue;
        }

        // 暂时不需要，因为一个flow对应一个线程池，而且初始化只有一次，
        // 如果关闭了，那么在网页中只能执行一次。后续可以修改初始化方案
        public void awaitTermination() {
            // 直到没有正在运行的任务，则终止
            while (executingNum.get() > 0) {
                removeDoneFuture();
            }
            running = false;
            synchronized (strategyQueue) {  //防止run在等待，无法进入到while(running判断)导致线程无法结束
                strategyQueue.notifyAll();
            }
        }

        /**
         * 获取一个空闲线程的下标
         *
         * @return
         */
        private int index() {
            for (int i = 0; i < threads; i++)
                if (futures[i] == null || futures[i].isDone())
                    return i;
            // 表示不存在
            return -1;
        }

        private void removeDoneFuture() {
            for (int i = 0; i < threads; i++) {
                try {
                    // future不为null 则等wait一段时间(任务完成后返回null,没完成就返回超时异常，则跳过)
                    // submit(Runnable r)返回的future对象，get方法获得的应该是null
                    if (futures[i] != null && futures[i].get(10, TimeUnit.MILLISECONDS) == null) {
                        futures[i] = null;
                    }
                } catch (Throwable t) {
                    // pass
                }
            }
        }

        private void await() {
            while (index() == -1)
                removeDoneFuture();
        }

        /**
         * 异步提交任务
         *
         * @param runnable
         * @param value
         * @param node
         * @param <T>
         * @return 返回一个Future对象
         */
        public <T> Future<T> submitAsync(Runnable runnable, T value, SpiderNode node) {
            logger.debug("当前父线程池活跃数量:{}",executor.getActiveCount());
            executingNum.incrementAndGet();
            Runnable r = () -> {
                try {
                    // 执行任务
                    runnable.run();
                } finally {
                    //完成后就减1
                    executingNum.decrementAndGet();
                }
            };
            SpiderFutureTask<T> futureTask = new SpiderFutureTask<>(r, value, node);
            // 添加到策略队列中
            strategyQueue.add(futureTask);

            //有任务了,通知其他在等待的线程
            synchronized (strategyQueue){
                //通知继续从集合中取任务提交到线程池中
                logger.debug("有任务，唤醒睡眠线程。");
                strategyQueue.notifyAll();
            }
            return futureTask;
        }

        // 等待-唤醒 设计模式
        public void run() {
            logger.debug(this.getName()+" enter while loop...");
            while (running) {
                try {
                    if(this.isInterrupted())
                        break;
//                    logger.debug(this.getName()+" in while loop...");
                    // 加锁，防止队列操作错误
                    synchronized (strategyQueue) {
                        if (strategyQueue.isEmpty()) {
                            // 等待，并让出锁
//                            logger.debug("没有任务存在,等待并让出锁");
                            strategyQueue.wait();
                        }
                    }
                    // 被唤醒后，将任务真正提交。也就是把任务从策略队列中提交
                    while (!strategyQueue.isEmpty()) {
                        SpiderFutureTask<?> futureTask = strategyQueue.get();
                        // 线程池满时的优化策略
                        if (index() == -1 && Thread.currentThread().getThreadGroup() == spiderThreadGroup) {
                            logger.info("线程池{}满,直接执行",super.getName());
                            futureTask.run();
                        }
                        else {
//                            logger.debug("等待有空闲线程再提交");
                            // 有空闲线程时提交
                            await();
                            int index = index();
                            futures[index] = executor.submit(futureTask);
                        }
                    }
                } catch (InterruptedException e) {
//                    logger.debug("submit interrupt :{}",e.getMessage());
                    this.interrupt(); //防止清除打断标记
                }
            }
            logger.debug("exit while loop...");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SpiderThreadPoolExecutorManager manager=new SpiderThreadPoolExecutorManager(64);
        SubThreadPool executor = manager.createSubThreadExecutor(2,new SeqStrategyQueue());
        executor.start();
        Runnable runnable=()-> {
            System.out.println("test===" + Thread.currentThread().getName());
        };
        manager.submit(runnable);
        for (int i = 0; i < 5; i++) {
            executor.submitAsync(runnable,null,null);
        }
//        executor.awaitTermination();
        Thread.sleep(3000);
        executor.interrupt();
        System.out.println("is alive:"+executor.isAlive());

        Thread.sleep(1000);
        System.out.println("is alive:"+executor.isAlive());

        for (int i = 0; i < 5; i++) {
            executor.submitAsync(runnable,null,null);
        }
        System.out.println("is alive:"+executor.isAlive());

    }
}


//        /**
//         * 是否提交任务中
//         */
//        private volatile boolean submitting = false;

//            logger.warn("检查是否要开启submit ,submitting={}",submitting);
//                    logger.warn("下面唤醒等待线程, submitting={}",submitting);

//// balking模式，第一次运行
//            if(!submitting) {
//                    submitting=true;
//                    CompletableFuture.runAsync(this::run);
//                    }